-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[DO NOT MERGE] TransferManager: DirectoryUploader & DirectoryDownloader #3288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: version-3
Are you sure you want to change the base?
Conversation
Detected 1 possible performance regressions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is on the right path but we should break this down into chunks. I think you should first refactor file uploader/downloader to use the executor, and we should release that, then add directory upload/download after that.
@queue = Queue.new | ||
@max_threads = options[:max_threads] || 10 | ||
@pool = [] | ||
@running = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does running state default to true in the initializer for other implementations of this in ruby-concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually yes - concurrent-ruby's implementation goes a bit deeper but I simplified our implementation. Links below (most common executors eventually inherits RubyExecutorService
:
- AbstractExecutorService: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb#L52-L54
- RubyExecutorService: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb#L70-L72
- SimpleExecutorService: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb#L98
module Aws | ||
module S3 | ||
# Raised when DirectoryDownloader fails to download objects from S3 bucket | ||
class DirectoryDownloadError < StandardError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention we were putting these in separate files right? If you want to promote the other two (multipart errors) to the files where they are used that's fine too, but let's stay consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'm planning to separate them out.
# @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT) | ||
def initialize(options = {}) | ||
@client = options[:client] || Client.new | ||
@thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread count wouldn't matter in this class anymore right?
parts = upload_parts(upload_id, source, file_size, options) | ||
complete_upload(upload_id, parts, file_size, options) | ||
ensure | ||
@executor.shutdown if @executor.running? && @options[:executor].nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems odd. I would think the executor always exists because we provide a default - if nil we still provide a default. Also you would check if it were nil before you attempt to call a method on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was still trying to figure out executor but I have streamlined the executor flow a bit.
end | ||
|
||
download_opts = options.dup | ||
@bucket = bucket |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd to set instance state like bucket in a method. Wouldn't you pass bucket, errors, configuration, etc, down to relevant methods and return back errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd agree. I'm not against this being a "one-shot" class - IE, for each directory download, you must create this object again - in which case bucket/destination would be set on initialization instead.
Either way though - setting it as a member variable here is a little weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I streamlined a bit here so there's less of floating instance states
downloads = process_download_queue(producer, downloader, download_opts) | ||
build_result(downloads) | ||
ensure | ||
@executor.shutdown unless @options[:executor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always assume an executor I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an internal/private api - I think I'd agree that its reasonable to always require an executor to be provided, and then we don't ever shut it down
downloads = process_download_queue(producer, downloader, download_opts) | ||
build_result(downloads) | ||
ensure | ||
@executor.shutdown unless @options[:executor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an internal/private api - I think I'd agree that its reasonable to always require an executor to be provided, and then we don't ever shut it down
end | ||
|
||
download_opts = options.dup | ||
@bucket = bucket |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd agree. I'm not against this being a "one-shot" class - IE, for each directory download, you must create this object again - in which case bucket/destination would be set on initialization instead.
Either way though - setting it as a member variable here is a little weird.
download_attempts = 0 | ||
completion_queue = Queue.new | ||
queue_executor = DefaultExecutor.new | ||
while (object = producer.object_queue.shift) != :done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion on the Producer interface - I think the object_queue should be an internal detail here. I'd lean towards having this implement enumerable, so here you would just do:
producer.each do |object|
break if @abord_download
# rest of code
end
calling each
on the producer would start it (so no need to call run
) and would handle yielding objects from the internal object_queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback! I made some changes on the interface and I really like it! It flows pretty well :)
|
||
upload_opts = options.dup | ||
@ignore_failure = upload_opts.delete(:ignore_failure) || false | ||
@errors = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as on the DirectoryDownloader - I think we need to decide if these classes are "one shot" or re-usable. If they are re-usable, then we cannot set state on the class.
Because the File uploader/downloader are NOT one shot, I guess I'd lean towards consistency here and having a DirectoryUploader
that is re-useable and then I think the easiest way to manage this is to create a private class DirectoryUpload
which is one-shot (ie, represents the state of a single directory upload.
As part of this - I think we would move management of the queue executor to the top level object- that is, the same queue executor would be shared across any concurrent DirectoryUploads (so that effectively a configured max_concurrent_file_uploads
setting would apply across all concurrent uploads started from the same DirectoryUploader).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some heavy changes around this. I'm not sure if I want to abstract out one-shot DirectoryUpload yet. 🤔 open to any thoughts after you take a look at the revisions!
File.rename(opts[:temp_path], destination) if opts[:temp_path] | ||
ensure | ||
File.delete(@temp_path) if @temp_path && File.exist?(@temp_path) | ||
@executor.shutdown if @executor.running? && @options[:executor].nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is an issue with this - the FileDowloader
can be used to download multiple files right (ie, without creating a new one)? so here, if we shutdown the executor on the first download call, we'll never be able to call download again.
I think we'd probably want to call shutdown only when the downloader is GC'ed (and maybe at that point, we would just call kill on it instead?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking a bit through - given that executor is always passed in at init - i think we can omit this/shutdown outside of the FileDownloader (at Object/TM level).
Experimental prototype
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
To make sure we include your contribution in the release notes, please make sure to add description entry for your changes in the "unreleased changes" section of the
CHANGELOG.md
file (at corresponding gem). For the description entry, please make sure it lives in one line and starts withFeature
orIssue
in the correct format.For generated code changes, please checkout below instructions first:
https://github.com/aws/aws-sdk-ruby/blob/version-3/CONTRIBUTING.md
Thank you for your contribution!